package org.luo.lan.client.handlers;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.luo.lan.client.ClientApplication;
import org.luo.lan.client.config.ClientConfig;
import org.luo.lan.client.constants.AttrConstants;
import org.luo.lan.client.factory.SessionFactory;
import org.luo.lan.common.WaitWriteTask;
import org.luo.lan.common.handler.AbstractHanderController;
import org.luo.lan.common.handler.ControlTypeConstant;
import org.luo.lan.common.handler.Controller;
import org.luo.lan.common.handler.Request;
import org.luo.lan.common.util.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Auther: luobiao
 * @Date: 2020/9/19 09:15
 * @Description:
 */
@Slf4j
public class ClientBridgeHandler extends SimpleChannelInboundHandler<Request> {
    private static final int MAX_HEART_BEAT_FAIL_TIMES = 3; //最大心跳超时失败次数
    private static final int HEART_BEAT_INTERVAL_TIME = 5; //心跳间隔时间
    private static final long RE_AUTH_TIME = 3 * 1000;
    private AtomicInteger failTimes = new AtomicInteger(0);
    private final ClientApplication application;
    private Map<String, BlockingQueue<Runnable>> queueMap = new HashMap<>();
    private ReentrantLock createChannelLock = new ReentrantLock();

    private Controller controller;

    public ClientBridgeHandler(ClientApplication application) {
        this.application = application;
        this.controller = new ClientBridgeController();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Request request) {
        this.controller.control(ctx, request);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.pipeline().addFirst("idle", new IdleStateHandler(0, 0, HEART_BEAT_INTERVAL_TIME));
        ctx.writeAndFlush(Request.SUCCESS(ControlTypeConstant.AUTH, ClientConfig.getInstance().getKey()));
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.ALL_IDLE) {
                ctx.writeAndFlush(Request.SUCCESS(ControlTypeConstant.HEARTBEAT, "PING"));
                if (failTimes.get() >= MAX_HEART_BEAT_FAIL_TIMES) {
                    log.debug("超过最大心跳超时次数，关闭客户端桥接通道！");
                    ctx.close();//关闭后会执行channelInactive进行重连
                } else {
                    failTimes.incrementAndGet();
                    //log.debug("累加失败次数，当前值为"+failTimes.get());
                }
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.debug("客户端桥接通道关闭中...，即将重连");
        SessionFactory.INSTANCE.closeAllSeesionChannel();
        application.closeProxyChannle();
        ctx.channel().eventLoop().execute(() -> {
            application.connect();
        });
        super.channelInactive(ctx);
    }


    public class ClientBridgeController extends AbstractHanderController<ChannelHandlerContext> {

        @Override
        public void auth(ChannelHandlerContext ctx, Request request) {
            if (request.isSuccess()) {
                log.debug("服务器授权成功！");
            } else {
                log.error("服务器授权失败，失败原因：{}", request.getStringBody() + "，将于" + RE_AUTH_TIME / 1000 + "s后重新发起授权申请！");
                ctx.channel().eventLoop().schedule(() -> {
                    ctx.writeAndFlush(Request.SUCCESS(ControlTypeConstant.AUTH, ClientConfig.getInstance().getKey()));
                }, RE_AUTH_TIME, TimeUnit.MILLISECONDS);
                //ctx.channel().close();
            }
        }

        @Override
        public void transport(ChannelHandlerContext ctx, Request request){
            ByteBuf byteBuf = Unpooled.wrappedBuffer(request.getBody());
            String[] strArr = request.getStringHeader().split("@");
            String sessionKey = strArr[0];
            String target = strArr[1];
            NioSocketChannel targetChannel = SessionFactory.INSTANCE.getSessionChannel(sessionKey);
            if (targetChannel == null) {
                String host = target.split(":")[0];
                int port = Integer.valueOf(target.split(":")[1]);
                try{
                    if(createChannelLock.tryLock(3, TimeUnit.SECONDS)){
                        targetChannel = SessionFactory.INSTANCE.getSessionChannel(sessionKey);
                        if (targetChannel == null) { //再次判断proxyChannel是否有值，因为可能在上次判断到获取锁前有线程已经创建好proxyChannel了
                            if (queueMap.containsKey(sessionKey)) {//如果queueMap中有该sessionKey了则说明有线程正在创建proxyChannel，则添加到queue中
                                BlockingQueue<Runnable> queue=queueMap.get(sessionKey);
                                if (queue != null) {
                                    queue.add(new WaitWriteTask(()->{
                                        NioSocketChannel channel = SessionFactory.INSTANCE.getSessionChannel(sessionKey);
                                        channel.writeAndFlush(byteBuf);
                                    }));
                                }
                            }else{
                                queueMap.put(sessionKey, new LinkedBlockingQueue<>());
                                SessionFactory.INSTANCE.createSession(ctx, host, port, sessionKey, (p -> {
                                    if (p == null) {
                                        log.error("创建目标会话channel失败，sessionKey={}",sessionKey);
                                        ctx.channel().writeAndFlush(Request.SUCCESS(ControlTypeConstant.CLOSE_SESSION, "客户端创建目标会话通道失败，请求关闭服务器会话通道！").addHeader(sessionKey));
                                        queueMap.remove(sessionKey);
                                    }else{
                                        p.attr(AttrConstants.SEESION_KEY).set(sessionKey);
                                        p.attr(AttrConstants.BRIDGE_CHANNEL).set((NioSocketChannel)ctx.channel());
                                        p.attr(AttrConstants.IS_MANUAL_CLOSE).set(false);
                                        p.writeAndFlush(byteBuf);
                                        BlockingQueue<Runnable> queue=queueMap.get(sessionKey);
                                        while (queue.size() > 0) {
                                            try {
                                                queue.take().run(); //执行队列中的 channel.writeAndFlush(byteBuf);操作
                                            } catch (InterruptedException e) {
                                                log.error("出队线程被中断{}"+e);
                                            }
                                        }
                                        queueMap.remove(sessionKey);
                                    }
                                }));
                            }
                        }else{
                            targetChannel.writeAndFlush(byteBuf);
                        }
                    }else{
                        log.error("获取锁超时！");
                    }
                } catch (InterruptedException e) {
                    log.error("线程被中断！e={}",e);
                    queueMap.remove(sessionKey);//防止在刚刚put完线程被中断的情况无法回收sessionKey对应的Queue
                } finally {
                    createChannelLock.unlock();
                }
                return;
            }
            targetChannel.writeAndFlush(byteBuf);
        }

        @Override
        public void heartbeat(ChannelHandlerContext ctx, Request request) {
            //log.debug("客户端收到服务器心跳回应，重置失败次数为0！");
            failTimes.set(0);
        }

        @Override
        public void closeSession(ChannelHandlerContext ctx, Request request) {
            String sessionKey = request.getStringHeader();
            if (StringUtils.isNotBlank(sessionKey)) {
                log.debug("关闭sessionKey={}会话通道：{}",sessionKey,request.getStringBody());
                ctx.channel().attr(AttrConstants.IS_MANUAL_CLOSE).set(true);
                SessionFactory.INSTANCE.closeSeesionChannel(sessionKey);
            }
        }

        @Override
        public void proxy(ChannelHandlerContext ctx, Request request) {
            String sessionKey=request.getStringHeader();
            NioSocketChannel proxyChannel=SessionFactory.INSTANCE.getSessionChannel(sessionKey);
            if (proxyChannel != null) {
                String response=request.getStringBody();
                log.debug("sessionkey={}代理响应的channel={}，：",sessionKey,proxyChannel);
                /*log.debug("---------------------------------代理响应内容begin---------------------------------------");
                log.debug(response);
                log.debug("-----------------------------------代理响应内容end-------------------------------------");*/
                ByteBuf byteBuf = Unpooled.wrappedBuffer(request.getBody());
                proxyChannel.writeAndFlush(byteBuf);
            }else{
                log.info("会话不存在！sessionKey={}，ctx={}",sessionKey,ctx);
            }
        }
    }

}
